/*
 * Copyright 2017 StreamSets Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.streamsets.datacollector.bundles;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.streamsets.datacollector.blobstore.BlobStoreTask;
import com.streamsets.datacollector.execution.PipelineStateStore;
import com.streamsets.datacollector.execution.SnapshotStore;
import com.streamsets.datacollector.json.ObjectMapperFactory;
import com.streamsets.datacollector.main.BuildInfo;
import com.streamsets.datacollector.main.RuntimeInfo;
import com.streamsets.datacollector.store.PipelineStoreTask;
import com.streamsets.datacollector.task.AbstractTask;
import com.streamsets.datacollector.usagestats.StatsCollector;
import com.streamsets.datacollector.util.Configuration;
import com.streamsets.pipeline.api.impl.Utils;
import com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import org.apache.commons.lang3.ArrayUtils;
import org.cloudera.log4j.redactor.StringRedactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
 * Main manager that is taking care of bundle creation.
 */
public class SupportBundleManager extends AbstractTask implements BundleContext {

  private static final Logger LOG = LoggerFactory.getLogger(SupportBundleManager.class);

  /**
   * Executor service for generating new bundles.
   *
   * They are generated by a different thread and piped out via generateNewBundle() call.
   */
  private final ExecutorService executor;

  private final Configuration configuration;
  private final PipelineStoreTask pipelineStore;
  private final PipelineStateStore stateStore;
  private final SnapshotStore snapshotStore;
  private final BlobStoreTask blobStore;
  private final RuntimeInfo runtimeInfo;
  private final BuildInfo buildInfo;
  private final StatsCollector statsCollector;

  /**
   * List describing auto discovered content generators.
   */
  private LinkedHashMap<BundleContentGeneratorDefinition, BundleContentGenerator> generators;

  /**
   * Redactor to remove sensitive data.
   */
  private StringRedactor redactor;

  @Inject
  public SupportBundleManager(
    @Named("supportBundleExecutor") SafeScheduledExecutorService executor,
    Configuration configuration,
    PipelineStoreTask pipelineStore,
    PipelineStateStore stateStore,
    SnapshotStore snapshotStore,
    BlobStoreTask blobStore,
    RuntimeInfo runtimeInfo,
    BuildInfo buildInfo,
    StatsCollector statsCollector
  ) {
    super("Support Bundle Manager");
    this.executor = executor;
    this.configuration = configuration;
    this.pipelineStore = pipelineStore;
    this.stateStore = stateStore;
    this.snapshotStore = snapshotStore;
    this.blobStore = blobStore;
    this.runtimeInfo = runtimeInfo;
    this.buildInfo = buildInfo;
    this.statsCollector = statsCollector;
  }

  @Override
  protected void initTask() {
    Set<String> ids = new HashSet<>();

    this.generators = new LinkedHashMap<>();
    try {
      InputStream generatorResource = Thread.currentThread().getContextClassLoader().getResourceAsStream(SupportBundleContentGeneratorProcessor.RESOURCE_NAME);
      BufferedReader reader = new BufferedReader(new InputStreamReader(generatorResource));
      String className;
      while((className = reader.readLine()) != null) {
        Class<? extends BundleContentGenerator> bundleClass = (Class<? extends BundleContentGenerator>) Class.forName(className);

        BundleContentGeneratorDef def = bundleClass.getAnnotation(BundleContentGeneratorDef.class);
        if(def == null) {
          LOG.error("Bundle creator class {} is missing required annotation", bundleClass.getName());
          continue;
        }

        String id = bundleClass.getSimpleName();
        if(!def.id().isEmpty()) {
          id = def.id();
        }

        if(ids.contains(id)) {
          LOG.error("Ignoring duplicate id {} for generator {}.", id, bundleClass.getName());
          continue;
        } else {
          ids.add(id);
        }

        BundleContentGenerator generator;
        try {
          LOG.debug("Generating new instance of {}", bundleClass.getName());
          generator = bundleClass.newInstance();
        } catch (Throwable t) {
          LOG.error("Can't create instance of {}", bundleClass.getName(), t);
          continue;
        }

        // Initialize the generator
        try {
          LOG.debug("Initializing generator {}", bundleClass.getName());
          generator.init(this);
        } catch (Throwable t) {
          LOG.error("Can't initialize generator {}", bundleClass.getName(), t);
          continue;
        }

        // We have all the metadata and an instance of the actual generator
        BundleContentGeneratorDefinition definition = new BundleContentGeneratorDefinition(
          bundleClass,
          def.name(),
          id,
          def.description(),
          def.version(),
          def.enabledByDefault(),
          def.order()
        );

        generators.put(definition, generator);
      }
    } catch (Exception e) {
      LOG.error("Was not able to initialize support bundle generator classes.", e);
    }

    // We need to sort all generators by their described order
    this.generators = generators.entrySet().stream()
        .sorted(Comparator.comparingInt(a -> a.getKey().getOrder()))
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> b, LinkedHashMap::new));

    // Create shared instance of redactor
    try {
      redactor = StringRedactor.createFromJsonFile(runtimeInfo.getConfigDir() + "/" + Constants.REDACTOR_CONFIG);
    } catch (IOException e) {
      LOG.error("Can't load redactor configuration, bundles will not be redacted", e);
      redactor = StringRedactor.createEmpty();
    }
  }

  @Override
  protected void stopTask() {
    LOG.info("Stopping Support Bundle Manager");
    for(Map.Entry<BundleContentGeneratorDefinition, BundleContentGenerator> entry : generators.entrySet()) {
      String generatorId = entry.getKey().getId();

      try {
        LOG.debug("Destroying generator {}", generatorId);
        entry.getValue().destroy(this);
      } catch (Throwable t) {
        LOG.error("Can't destroy generator {}", generatorId, t);
      }
    }
  }

  /**
   * Returns immutable list with metadata of registered content generators.
   */
  public List<BundleContentGeneratorDefinition> getContentDefinitions() {
    return new ArrayList<>(generators.keySet());
  }

  /**
   * Return InputStream from which a new generated resource bundle can be retrieved.
   */
  public SupportBundle generateNewBundle(List<String> generatorNames, BundleType bundleType) throws IOException {
    PipedInputStream inputStream = new PipedInputStream();
    PipedOutputStream outputStream = new PipedOutputStream();
    inputStream.connect(outputStream);
    ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream);

    executor.submit(() -> generateNewBundleInternal(generatorNames, bundleType, zipOutputStream));

    String bundleName = generateBundleName(bundleType);
    String bundleKey = generateBundleDate(bundleType) + "/" + bundleName;

    return new SupportBundle(
      bundleKey,
      bundleName,
      inputStream
    );
  }

  private String getCustomerId() {
    File customerIdFile = new File(runtimeInfo.getDataDir(), Constants.CUSTOMER_ID_FILE);
    if(!customerIdFile.exists()) {
      return Constants.DEFAULT_CUSTOMER_ID;
    }

    try {
      return com.google.common.io.Files.readFirstLine(customerIdFile, StandardCharsets.UTF_8).trim();
    } catch (IOException ex) {
      throw new RuntimeException(Utils.format("Could not read customer ID file '{}': {}", customerIdFile, ex.toString()), ex);
    }
  }

  private String generateBundleDate(BundleType bundleType) {
    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    return bundleType.getPathPrefix() + dateFormat.format(new Date());
  }

  private String generateBundleName(BundleType bundle) {
    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss");
    StringBuilder builder = new StringBuilder(bundle.getNamePrefix());
    if(bundle.isAnonymizeMetadata()) {
      builder.append(UUID.randomUUID().toString());
    } else {
      builder.append(getCustomerId());
      builder.append("_");
      builder.append(runtimeInfo.getId());
    }
    builder.append("_");
    builder.append(dateFormat.format(new Date()));
    builder.append(".zip");

    return builder.toString();
  }

  private void generateNewBundleInternal(
      List<String> generatorNames,
      BundleType bundleType,
      ZipOutputStream zipStream
  ) {
    try {
      Properties runGenerators = new Properties();
      Properties failedGenerators = new Properties();

      // Iterate over all generators (there is only a few of them)
      for(Map.Entry<BundleContentGeneratorDefinition, BundleContentGenerator> entry: generators.entrySet()) {
        BundleContentGeneratorDefinition definition = entry.getKey();
        BundleContentGenerator generator = entry.getValue();

        // Filter by those that are/aren't active
        if(generatorNames.isEmpty()) {
          // If no explicit generators are specified, we will only allow running those that are enabled by default
          if(!definition.isEnabledByDefault()) {
            continue;
          }
        } else {
          // Otherwise we will continue only if the generator was specifically allowed
          if(!generatorNames.contains(definition.getId())) {
            continue;
          }
        }


        BundleWriterImpl writer = new BundleWriterImpl(
          definition.getKlass().getName(),
          redactor,
          zipStream
        );

        try {
          LOG.debug("Generating content with {} generator", definition.getKlass().getName());
          generator.generateContent(this, writer);
          runGenerators.put(definition.getKlass().getName(), String.valueOf(definition.getVersion()));
        } catch (Throwable t) {
          LOG.error("Generator {} failed", definition.getName(), t);
          failedGenerators.put(definition.getKlass().getName(), String.valueOf(definition.getVersion()));
          writer.ensureEndOfFile();
        }
      }

      // generators.properties
      zipStream.putNextEntry(new ZipEntry("generators.properties"));
      runGenerators.store(zipStream, "");
      zipStream.closeEntry();

      // failed_generators.properties
      zipStream.putNextEntry(new ZipEntry("failed_generators.properties"));
      failedGenerators.store(zipStream, "");
      zipStream.closeEntry();

      if(!bundleType.isAnonymizeMetadata()) {
        // metadata.properties
        zipStream.putNextEntry(new ZipEntry("metadata.properties"));
        getMetadata(bundleType).store(zipStream, "");
        zipStream.closeEntry();
      }

    } catch (Exception e) {
      LOG.error("Failed to generate resource bundle", e);
    } finally {
      // And that's it
      try {
        zipStream.close();
      } catch (IOException e) {
        LOG.error("Failed to finish generating the bundle", e);
      }
    }
  }

  private Properties getMetadata(BundleType bundleType) {
    Properties metadata = new Properties();
    metadata.put("version", "1");
    metadata.put("sdc.version", buildInfo.getVersion());
    metadata.put("sdc.id", runtimeInfo.getId());
    metadata.put("sdc.acl.enabled", String.valueOf(runtimeInfo.isAclEnabled()));
    metadata.put("customer.id", getCustomerId());
    metadata.put("bundle.type", bundleType.getTag());

    return metadata;
  }

  @Override
  public Configuration getConfiguration() {
    return configuration;
  }

  @Override
  public BuildInfo getBuildInfo() {
    return buildInfo;
  }

  @Override
  public RuntimeInfo getRuntimeInfo() {
    return runtimeInfo;
  }

  @Override
  public PipelineStoreTask getPipelineStore() {
    return pipelineStore;
  }

  @Override
  public PipelineStateStore getPipelineStateStore() {
    return stateStore;
  }

  @Override
  public SnapshotStore getSnapshotStore() {
    return snapshotStore;
  }

  @Override
  public BlobStoreTask getBlobStore() {
    return blobStore;
  }

  @Override
  public StatsCollector getStatsCollector() {
    return statsCollector;
  }

  private static class BundleWriterImpl implements BundleWriter {

    private boolean insideFile;
    private final String prefix;
    private final StringRedactor redactor;
    private final ZipOutputStream zipOutputStream;

    public BundleWriterImpl(
      String prefix,
      StringRedactor redactor,
      ZipOutputStream outputStream
    ) {
      this.prefix = prefix + File.separator;
      this.redactor = redactor;
      this.zipOutputStream = outputStream;
      this.insideFile = false;
    }

    @Override
    public void markStartOfFile(String name) throws IOException {
      zipOutputStream.putNextEntry(new ZipEntry(prefix + name));
      insideFile = true;
    }

    @Override
    public void markEndOfFile() throws IOException {
      zipOutputStream.closeEntry();
      insideFile = false;
    }

    public void ensureEndOfFile() throws IOException {
      if(insideFile) {
        markEndOfFile();
      }
    }

    public void writeInternal(String string, boolean ln) throws IOException {
      zipOutputStream.write(redactor.redact(string).getBytes());
      if(ln) {
        zipOutputStream.write('\n');
      }
    }

    @Override
    public void write(String str) throws IOException {
      writeInternal(str, false);
    }

    @Override
    public void writeLn(String str) throws IOException {
      writeInternal(str, true);
    }

    @Override
    public void write(String fileName, Properties properties) throws IOException {
      markStartOfFile(fileName);

      for(Map.Entry<Object, Object> entry: properties.entrySet()) {
        String key = (String) entry.getKey();
        String value = (String) entry.getValue();

        writeLn(Utils.format("{}={}", key, value));
      }

      markEndOfFile();
    }

    @Override
    public void write(String fileName, InputStream inputStream) throws IOException {
      try(BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
        copyReader(reader, fileName, 0);
      }
    }

    @Override
    public void write(String dir, Path path) throws IOException {
      write(dir, path, 0);
    }

    @Override
    public void write(String dir, Path path, long startOffset) throws IOException {
      // We're not interested in serializing non-existing files
      if(!Files.exists(path)) {
        return;
      }

      try (BufferedReader reader = Files.newBufferedReader(path)) {
        copyReader(reader, dir + "/" + path.getFileName(), startOffset);
      }
    }

    @Override
    public void writeJson(String fileName, Object object) throws IOException {
      ObjectMapper objectMapper = ObjectMapperFactory.get();
      markStartOfFile(fileName);
      write(objectMapper.writeValueAsString(object));
      markEndOfFile();
    }

    @Override
    public JsonGenerator createGenerator(String fileName) throws IOException {
      markStartOfFile(fileName);
      return new JsonFactory().createGenerator(new JsonGeneratorOutputStream(zipOutputStream, redactor));
    }

    private void copyReader(BufferedReader reader, String path, long startOffset) throws IOException {
      markStartOfFile(path);

      if(startOffset > 0) {
        reader.skip(startOffset);
      }

      String line = null;
      while ((line = reader.readLine()) != null) {
        writeLn(line);
      }

      markEndOfFile();
    }
  }

  private static class JsonGeneratorOutputStream extends OutputStream {

    private final ArrayList<Byte> bytes;
    private final ZipOutputStream zipOutputStream;
    private final StringRedactor redactor;

    public JsonGeneratorOutputStream(ZipOutputStream stream, StringRedactor redactor) {
      this.bytes = new ArrayList<>();
      this.zipOutputStream = stream;
      this.redactor = redactor;
    }

    @Override
    public void write(int b) throws IOException {
      // Add the byte to the line
      bytes.add((byte)b);

      // If it's final line, write the data out
      if(b == '\n') {
        writeOut();
      }
    }

    private void writeOut() throws IOException {
      byte[] byteLine = ArrayUtils.toPrimitive(bytes.toArray(new Byte[bytes.size()]));
      String string = new String(byteLine, Charset.defaultCharset());
      zipOutputStream.write(redactor.redact(string).getBytes());

      bytes.clear();
    }

    @Override
    public void close() throws IOException {
      // Write down remaining bytes, but do not close the underlying zip stream
      writeOut();
    }
  }
}
